Firehoseを使ったコードのテストをLocalStackを使って動かしてみる!
LocalStackでKinesis Firehose検証用のコード書いています。齋藤です。
それでは今回はLocalStackでKinesis Firehoseを動かしてみたいと思います。 Firehoseの連携先はS3です。 ちなみに最後の方にも書いたのですが、LocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。
今回書いたコードはgithub上に置いています。
LocalStackでKinesis Firehoseの連携先S3バケットを作成する
S3のバケットを作成するためにはAmazonS3オブジェクトが必要になってきます。 dd以下のコードでLocalStackに向けたS3クライアントをセットアップします。
AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy")); AmazonS3 s3 = AmazonS3ClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null)) .withPathStyleAccessEnabled(true) // これがないとエラーになります。 .build();
バケットの作成を以下のコードで行います。
s3.createBucket("test");
これでひとまず、LocalStackで使うS3のバケットのセットアップは終了です。
LocalStackでKinesis FirehoseのDelivery Streamを作成します。
LocalStackに向けたKinesis Firehoseのクライアントの作成は以下のコードで行います。
AWSCredentialsProvider dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy")); AmazonKinesisFirehose firehose = AmazonKinesisFirehoseClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null)) .build();;
先ほど作成したFirehoseのクライアントを使って 以下のコードでDelivery Streamを作成します。
firehose.createDeliveryStream( new CreateDeliveryStreamRequest() .withDeliveryStreamName("testStream") .withS3DestinationConfiguration( new S3DestinationConfiguration() .withBucketARN("arn:aws:s3:::test") // 先ほど作ったS3のバケット名でS3のARNを設定します。 .withPrefix("firehose/") .withRoleARN("arn:aws:iam::dummy:role/dummy"))) // LocalStackにはIAM関連のAPIはないみたいなのでdummy roleです。
RecordをPutしてS3に連携されていることをザッと確認してみます。
RecordをPutします。 このコードは都元ダイスケ氏の記事から持ってきました。
firehose.putRecord( new PutRecordRequest() .withDeliveryStreamName("testStream") .withRecord( new Record() .withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)))));
S3からlistObjectを使ってKinesisによって連係されていることを確認します。 アサーションはassertjを使っています。
assertThat(s3.listObjects("test") .getObjectSummaries()).anySatisfy(summary -> { assertThat(summary.getKey()).startsWith("firehose/"); });
S3のお片付け
S3ってobjectがあると削除できないんですね・・・・。初めて知りました。 先にObjectを削除してからBucketを削除します。
@After public void tearDown() { firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream")); s3.listObjects("test") .getObjectSummaries() .forEach(s -> { s3.deleteObject(s.getBucketName(), s.getKey()); }); s3.deleteBucket("test"); }
なお、CLIだとforce deleteできるようです。 ログを見た感じだと中で似たようなことをやっていそうですね。
aws --endpoint http://localhost:4572 s3 rb s3://test --force
delete: s3://test/firehose/28fe77a2-d0ef-4d39-9d6a-1a576709b922
delete: s3://test/firehose/2e4038af-7997-4735-8e2f-2e5f69b7fb88
delete: s3://test/firehose/3038ba8b-df66-46f9-9398-4d7e3ac7182f
delete: s3://test/firehose/8c1720be-e3dd-4faa-b068-d254434c00fe
delete: s3://test/xxxxx
remove_bucket: test
今回書いたコード
今回書いたコードはgithub上に置いています。 こちらにも貼り付けておきます。
public class FirehoseTest { private AmazonKinesisFirehose firehose; private AWSCredentialsProvider dummyProvider; private AmazonS3 s3; @Before public void setup() { System.setProperty("com.amazonaws.sdk.disableCbor", "1"); // LocalStackが対応していないプロトコルを使わないようにする dummyProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummy", "dummy"));; firehose = AmazonKinesisFirehoseClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4573", null)) .build();; s3 = AmazonS3ClientBuilder.standard() .withCredentials(dummyProvider) .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:4572", null)) .withPathStyleAccessEnabled(true) .build();; } @Test public void test() { s3.createBucket("test"); firehose.createDeliveryStream( new CreateDeliveryStreamRequest() .withDeliveryStreamName("testStream") .withS3DestinationConfiguration( new S3DestinationConfiguration() .withBucketARN("arn:aws:s3:::test") .withPrefix("firehose/") .withRoleARN("arn:aws:iam::dummy:role/dummy"))); while (true) { try { firehose.describeDeliveryStream(new DescribeDeliveryStreamRequest().withDeliveryStreamName("testStream")); break; } catch (ResourceNotFoundException e) { } } firehose.putRecord( new PutRecordRequest() .withDeliveryStreamName("testStream") .withRecord( new Record() .withData(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8))))); assertThat(s3.listObjects("test") .getObjectSummaries()).anySatisfy(summary -> { assertThat(summary.getKey()).startsWith("firehose/"); }); } @After public void tearDown() { firehose.deleteDeliveryStream(new DeleteDeliveryStreamRequest().withDeliveryStreamName("testStream")); s3.listObjects("test") .getObjectSummaries() .forEach(s -> { s3.deleteObject(s.getBucketName(), s.getKey()); }); s3.deleteBucket("test"); } }
まとめ
いかがだったでしょうか。
今回はKinesis FirehoseをLocalStackを使って動かしてみました。 最近LocalStackを使って検証しているのですが、LocalStack非常に便利ですね!
~今度はElasticsearchをDestinationにした記事を書くつもりです。~
と思ったらLocalStackではFirehoseからRedshiftとElasticsearchへの連携は実装されていないようです。 どうしようテスト。 Firehoseに投げるテストをするだけならS3に連携しておけばいいかなって気もしますね。
辛い 辛くない
Issue報告したら直りました。(というかAPIがなかったみたい)
AWS CLIでLocalStack上のdelivery-streamを削除しようとしたらエラーが返ってきます。
今回自分はDockerでLocalStackを使っているので
--rm
オプションを使ってdocker run
したりやdocker rm
したりして環境を真っさらにしています。。。